package com.atuguigu.flink.Day05;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

//状态
public class Example6 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        env
                .addSource(new SensorSource())
                .keyBy(r->r.id)
                .process(new KeyedProcessFunction<String, SendsorReading, String>() {
                    private MapState<String,Double> mapState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Double>("map-state", Types.STRING, Types.DOUBLE));
                    }

                    @Override
                    public void processElement(SendsorReading value, Context ctx, Collector<String> out) throws Exception {
                        mapState.put(value.id,value.temperture);
                        out.collect("key:" + value.id +"value" + mapState.get(value.id));

                    }

                })
                .print();




        env.execute();
    }
}
